package com.yz.kronos.schedule.client;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.yz.kronos.schedule.model.KronosConfig;
import io.fabric8.kubernetes.api.model.*;
import io.fabric8.kubernetes.api.model.batch.Job;
import io.fabric8.kubernetes.api.model.batch.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.JobSpec;
import io.fabric8.kubernetes.api.model.batch.JobSpecBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.yz.kronos.common.Constant.EXECUTOR;
import static com.yz.kronos.common.Constant.SHARE_TOTAL;

/**
 * @author shanchong
 * @date 2019-12-20
 **/
@Slf4j
@Service
public class KubernetesClient {

    public static String KRONOS_VOLUME_NAME = "kronos-logfile";
    @Autowired
    private KronosConfig kronosConfig;

    /**
     * 调用k8s的api生成pod
     * @param execId 执行id
     * @param shareTotal 分片数量
     * @param resources 资源
     * @param image docker镜像
     * @param cmd 启动命令
     */
    public void handle(String namespace, final String execId, final int shareTotal, final String resources,
                        final String image, final String cmd, String executor) {
        String serviceApi = kronosConfig.getServiceApi();
        String imagePullPolicy = kronosConfig.getImagePullPolicy();
        String logPath = kronosConfig.getLogPath();
        String restartPolicy = kronosConfig.getRestartPolicy();
        String groupName = kronosConfig.getGroupName();
        String apiVersion = kronosConfig.getApiVersion();
        String apiVersion1 = groupName + "/" + apiVersion;
        String mountPath = logPath + "/" + namespace;
        String podLogPath = mountPath + "/" + execId;

        String containerName = "kronos-container";
        List<EnvVar> envVars = getEnvVarList(execId, executor, shareTotal);
        Map<String, String> labels = getLabels(execId);
        Map<String, Quantity> requests = getResources(resources);
        ObjectMeta objectMeta = new ObjectMetaBuilder().withNamespace(namespace).withLabels(labels).withName(execId).build();
        Container container = new ContainerBuilder()
                .withSecurityContext(new SecurityContextBuilder().withRunAsNonRoot(false).withAllowPrivilegeEscalation(true).build())
                .withName(containerName)
                .withResources(new ResourceRequirementsBuilder().withRequests(requests).build())
                .withImage(image)
                .withEnv(envVars)
                .addToCommand(cmd)
                .addToArgs(kronosConfig.getSchedulerUrl()+"/queue/index?flowId=" + execId)
                .withImagePullPolicy(imagePullPolicy)
                .withVolumeMounts(new VolumeMountBuilder().withName(KRONOS_VOLUME_NAME).withMountPath(mountPath).build())
                .build();
        Volume volumes = new VolumeBuilder().withName(KRONOS_VOLUME_NAME).withHostPath(new HostPathVolumeSourceBuilder().withPath(podLogPath).build()).build();
        JobSpec jobSpec = new JobSpecBuilder()
                .withCompletions(shareTotal)
                .withParallelism(shareTotal)
                .withTemplate(new PodTemplateSpecBuilder()
                        .withSpec(new PodSpecBuilder().withContainers(container).withVolumes(volumes).withRestartPolicy(restartPolicy).build())
                        .withMetadata(objectMeta).build())
                .build();
        Job job = new JobBuilder().withKind("Job").withApiVersion(apiVersion1).withMetadata(objectMeta).withSpec(jobSpec).build();
        log.info("request kubernetes api : {}", job);
        Config config = new ConfigBuilder()
                .withMasterUrl(serviceApi)
                .withTrustCerts(true)
                .build();
        DefaultKubernetesClient defaultKubernetesClient = new DefaultKubernetesClient(config);
        Job job1 = defaultKubernetesClient.inNamespace(namespace).batch().jobs().create(job);
        log.info("{}", job1);
    }

    private Map<String, Quantity> getResources(String resources) {
        JSONObject jsonObject = JSONUtil.parseObj(resources);
        Map<String, Quantity> resourcesMap = new HashMap<>();
        jsonObject.forEach((name,value)-> resourcesMap.put(name, new QuantityBuilder().withAmount(value.toString()).build()));
        return resourcesMap;
    }

    public List<EnvVar> getEnvVarList(String execId, String executor, int shareTotal) {
        return Arrays.asList(new EnvVarBuilder()
                        .withName(EXECUTOR).withValue(executor)
                        .build(),
                new EnvVarBuilder()
                        .withName(SHARE_TOTAL).withValue(String.valueOf(shareTotal))
                        .build()
        );
    }

    public Map<String, String> getLabels(String id) {
        return new HashMap<>(8);
    }
}
